package com.手写线程池.阻塞队列;

import com.手写线程池.拒绝策略设计.RejectPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author Fang Ruichuan
 * @date 2023-01-28 20:52
 */
public class BlockingQueue<T> {

    private Logger logger = LoggerFactory.getLogger(BlockingQueue.class);

    // 容量
    private Integer capacity;

    // 双端任务队列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 生产者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    public BlockingQueue(int capacity) {
        this.capacity = capacity;
    }

    // 尝试添加任务
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            // 如果队列超过容量
            if (deque.size() > capacity) {
                logger.debug("task too much, do reject");
                rejectPolicy.reject(this, task);
            } else {
                deque.offer(task);
                emptyWaitSet.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    // 阻塞的方式添加任务
    public void put(T task) {
        lock.lock();
        try {
            // 通过while的方式
            while (deque.size() >= capacity) {
                logger.debug("wait to add queue");
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            deque.offer(task);
            logger.debug("task add successfully");
            emptyWaitSet.signal();
        } finally {
            lock.unlock();
        }
    }

    // 阻塞获取任务
    public T take() {
        lock.lock();
        try {
            // 通过while的方式
            while (deque.isEmpty()) {
                try {
                    logger.debug("wait to take task");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            T task = deque.poll();
            logger.debug("take task successfully");
            // 从队列中获取元素
            return task;
        } finally {
            lock.unlock();
        }
    }


}
